# This code generates cz level sample counts with different kinds of accounts, e.g.
# people have tradelines, people have public records, people without any account, etc.



# Load packages

import pyspark.sql.functions as sqlf
import pandas as pd
import numpy  as np
import os
import sys
import shutil
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools2 import *


# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/1_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)

#########################
###       Main        ###
#########################

yList = range(2015, 2016)
m   = 6
geo_level = 'cz'

for y in yList:

    headersData = table_setup(sqlContext, y, m, 'headers')
    headersData.createOrReplaceTempView("headersData")

    tradesData  = table_setup(sqlContext, y, m, 'trades')
    tradesData.createOrReplaceTempView("tradesData")

    collectionsData = table_setup(sqlContext, y, m, 'collections')
    collectionsData.createOrReplaceTempView("collectionsData")

    pubrecsData = table_setup(sqlContext, y, m, 'pubrecs')
    pubrecsData.createOrReplaceTempView("pubrecsData")

    ym = yyyymm(y,m)
    print("^^^ "+ym+" ^^^")

    base = "SELECT " +\
            "h.subjectKey AS subjectKey, " +\
            "h.zip AS zip, " +\
            "CASE WHEN t.subjectKey IS NULL THEN 0 ELSE 1 END AS has_trades, "+\
            "CASE WHEN c.subjectKey IS NULL THEN 0 ELSE 1 END AS has_colls, "+\
            "CASE WHEN p.subjectKey IS NULL THEN 0 ELSE 1 END AS has_pubrecs, "+\
            "CASE WHEN t.subjectKey IS NULL AND c.subjectKey IS NULL AND p.subjectKey IS NULL THEN 1 ELSE 0 END AS has_nothing, "+\
            "CASE WHEN t.subjectKey IS NULL AND (c.subjectKey IS NOT NULL OR p.subjectKey IS NOT NULL) THEN 1 ELSE 0 END AS hasno_trades_has_collORpubr "+\
          "FROM headersData h " +\
               "LEFT JOIN (SELECT DISTINCT subjectKey FROM tradesData) t ON h.subjectKey=t.subjectKey " +\
               "LEFT JOIN (SELECT DISTINCT subjectKey FROM collectionsData) c ON h.subjectKey=c.subjectKey " +\
               "LEFT JOIN (SELECT DISTINCT subjectKey FROM pubrecsData) p ON h.subjectKey=p.subjectKey " +\
          "WHERE h.birthDate IS NOT NULL AND FLOOR((h.asOfDate - h.birthDate)/10000) BETWEEN 20 AND 80 "

    if geo_level == 'ind':
        sql = "SELECT COUNT(*) AS total, SUM(has_trades) AS has_trades, SUM(has_colls) AS has_colls, "+\
              "SUM(has_pubrecs) AS has_pubrecs, SUM(has_nothing) AS has_nothing, SUM(hasno_trades_has_collORpubr) AS hasno_trades_has_collORpubr " +\
              "FROM (" + base +") "
        sqlContext.sql(sql).show()
    elif geo_level == 'cz':
        sql = "SELECT cw.cz, FIRST(cw.czname) AS czname, FIRST(cw.statefip) AS statefip, FIRST(cw.stateabbr) AS stateabbr, FIRST(cw.county) AS county, "+\
              "COUNT(a.subjectKey) AS total, SUM(a.has_trades) AS has_trades, SUM(a.has_colls) AS has_colls, "+\
              "SUM(a.has_pubrecs) AS has_pubrecs, SUM(a.has_nothing) AS has_nothing, SUM(a.hasno_trades_has_collORpubr) AS hasno_trades_has_collORpubr " +\
              "FROM (" + base +") a LEFT JOIN " + CW + " cw ON a.zip=cw.zip " +\
              "GROUP BY cw.cz "
        saveStata(sql,'June2015_count_cz', WORKDIR, OUTDIR)


sqlContext.clearCache()

print(' !!!! SUCCESS !!!!')
